Stream of characters [Aho-Corasick Automata, Trie]

Time: ctor:O(N), query:O(M); Space: O(T); hard

Implement the StreamChecker class as follows:

  • StreamChecker(words): Constructor, init the data structure with the given words.

  • query(letter): returns true if and only if for some k >= 1, the last k characters queried (in order from oldest to newest, including this letter just queried) spell one of the words in the given list.

Example 1:

  • s = StreamChecker([“cd”,“f”,“kl”]); # init the dictionary.

  • s.query(‘a’) -> return false

  • s.query(‘b’) -> return false

  • s.query(‘c’) -> return false

  • s.query(‘d’) -> return true, because ‘cd’ is in the wordlist

  • s.query(‘e’) -> return false

  • s.query(‘f’) -> return true, because ‘f’ is in the wordlist

  • s.query(‘g’) -> return false

  • s.query(‘h’) -> return false

  • s.query(‘i’) -> return false

  • s.query(‘j’) -> return false

  • s.query(‘k’) -> return false

  • s.query(‘l’) -> return true, because ‘kl’ is in the wordlist

Constraints:

  • 1 <= len(words) <= 2000

  • 1 <= len(words[i]) <= 2000

  • Words will only consist of lowercase English letters.

  • Queries will only consist of lowercase English letters.

  • The number of queries is at most 40000.

Hints:

  1. Put the words into a trie, and manage a set of pointers within that trie.

Solution

1. Aho-Corasick Automata

[36]:
import collections

class AhoNode(object):
    def __init__(self):
        self.children = collections.defaultdict(AhoNode)
        self.indices = []
        self.suffix = None
        self.output = None
[37]:
class AhoTrie(object):

    def __init__(self, patterns):
        self.__root = self.__create_ac_trie(patterns)
        self.__node = self.__create_ac_suffix_and_output_links(self.__root)

    def step(self, letter):
        while self.__node and letter not in self.__node.children:
            self.__node = self.__node.suffix
        self.__node = self.__node.children[letter] if self.__node else self.__root
        return self.__get_ac_node_outputs(self.__node)

    def __create_ac_trie(self, patterns):  # Time:  O(n), Space: O(t)
        root = AhoNode()
        for i, pattern in enumerate(patterns):
            node = root
            for c in pattern:
                node = node.children[c]
            node.indices.append(i)
        return root

    def __create_ac_suffix_and_output_links(self, root):  # Time:  O(n), Space: O(t)
        queue = collections.deque()
        for node in root.children.values():
            queue.append(node)
            node.suffix = root

        while queue:
            node = queue.popleft()
            for c, child in node.children.items():
                queue.append(child)
                suffix = node.suffix
                while suffix and c not in suffix.children:
                    suffix = suffix.suffix
                child.suffix = suffix.children[c] if suffix else root
                child.output = child.suffix if child.suffix.indices else child.suffix.output

        return root

    def __get_ac_node_outputs(self, node):  # Time:  O(z), in this question, it could be improved to O(1)
                                            # if we only return a matched pattern without all matched ones
        result = []
        for i in node.indices:
            result.append(i)
            # return result
        output = node.output
        while output:
            for i in output.indices:
                result.append(i)
                # return result
            output = output.output
        return result
[38]:
class StreamChecker1(object):

    def __init__(self, words):
        """
        :type words: List[str]
        """
        self.__trie = AhoTrie(words)

    def query(self, letter):  # O(m) times
        """
        :type letter: str
        :rtype: bool
        """
        return len(self.__trie.step(letter)) > 0
[39]:
s = StreamChecker1(["cd","f","kl"])
assert s.query('a') == False
assert s.query('b') == False
assert s.query('c') == False
assert s.query('d') == True   # 'cd' is in the wordlist
assert s.query('e') == False
assert s.query('f') == True   #  'f' is in the wordlist
assert s.query('g') == False
assert s.query('h') == False
assert s.query('i') == False
assert s.query('j') == False
assert s.query('k') == False
assert s.query('l') == True    # 'kl' is in the wordlist

2. Aho-Corasick Automata

[40]:
import collections

class AhoNode(object):
    def __init__(self):
        self.children = collections.defaultdict(AhoNode)
        self.suffix = None
        self.outputs = []
[41]:
class AhoTrie(object):

    def __init__(self, patterns):
        self.__root = self.__create_ac_trie(patterns)
        self.__node = self.__create_ac_suffix_and_output_links(self.__root)

    def step(self, letter):
        while self.__node and letter not in self.__node.children:
            self.__node = self.__node.suffix
        self.__node = self.__node.children[letter] if self.__node else self.__root
        return self.__node.outputs                     # Time: O(z), it would be O(m) if we don't use all the matched patterns

    def __create_ac_trie(self, patterns):              # Time: O(n), Space: O(t)
        root = AhoNode()
        for i, pattern in enumerate(patterns):
            node = root
            for c in pattern:
                node = node.children[c]
            node.outputs.append(i)
        return root

    def __create_ac_suffix_and_output_links(self, root):  # Time: O(n + p^2), Space: O(t + p^2)
        queue = collections.deque()
        for node in root.children.values():
            queue.append(node)
            node.suffix = root

        while queue:
            node = queue.popleft()
            for c, child in node.children.items():
                queue.append(child)
                suffix = node.suffix
                while suffix and c not in suffix.children:
                    suffix = suffix.suffix
                child.suffix = suffix.children[c] if suffix else root
                child.outputs += child.suffix.outputs      # Time: O(p^2)

        return root
[42]:
class StreamChecker2(object):

    def __init__(self, words):
        """
        :type words: List[str]
        """
        self.__trie = AhoTrie(words)

    def query(self, letter):  # O(m) times
        """
        :type letter: str
        :rtype: bool
        """
        return len(self.__trie.step(letter)) > 0
[43]:
s = StreamChecker2(["cd","f","kl"])
assert s.query('a') == False
assert s.query('b') == False
assert s.query('c') == False
assert s.query('d') == True   # 'cd' is in the wordlist
assert s.query('e') == False
assert s.query('f') == True   #  'f' is in the wordlist
assert s.query('g') == False
assert s.query('h') == False
assert s.query('i') == False
assert s.query('j') == False
assert s.query('k') == False
assert s.query('l') == True    # 'kl' is in the wordlist